Skip to content

#2250 dragonfruit #2360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

Conversation

swyxio
Copy link

@swyxio swyxio commented Aug 7, 2025

special pr for eric


Summary of Changes

I successfully implemented the feature to add concurrencyKey to the task run context. Here's a comprehensive overview of the changes made:

1. Schema Updates (packages/core/src/v3/schemas/common.ts)

  • Added concurrencyKey: z.string().optional() to the V3TaskRun schema (line 378)
  • Added concurrencyKey: z.string().optional() to the TaskRun schema (line 225) for backward compatibility

2. Database Query Updates (apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts)

  • Added concurrencyKey: true to the AttemptForExecutionGetPayload.select.taskRun.select (line 1642)
  • Added concurrencyKey: taskRun.concurrencyKey ?? undefined to the execution object creation (line 1726)

3. Execution Context Updates (apps/webapp/app/v3/services/createTaskRunAttempt.server.ts)

  • Added concurrencyKey: taskRun.concurrencyKey ?? undefined to the execution object creation (line 223)
  • Note: The database query in this file already included all fields via include, so no query changes were needed

4. UI Context Updates (apps/webapp/app/routes/resources.runs.$runParam.ts)

  • Added concurrencyKey: run.concurrencyKey ?? undefined to the context object creation (line 192)
  • Note: The database query already selected concurrencyKey: true (line 55)

5. Test Coverage (packages/core/test/taskExecutor.test.ts)

  • Added a new test case to verify that concurrencyKey is properly included in the task context (lines 947-967)
  • Updated the executeTask helper function to accept an optional concurrencyKey parameter (line 1872)
  • Updated the test execution object to include the concurrencyKey field (line 1930)

How the Implementation Works

  1. Triggering: When a task is triggered with a concurrencyKey, it's stored in the database TaskRun.concurrencyKey field (this was already working)

  2. Database Retrieval: When creating task executions, the concurrencyKey is now selected from the database in all relevant queries

  3. Context Building: The concurrencyKey is included in the V3TaskRunExecution.run object when building the execution context

  4. Schema Validation: The updated V3TaskRun and TaskRun schemas now include the concurrencyKey field, ensuring type safety

  5. Context Parsing: The existing context parsing logic in the CLI workers (TaskRunContext.parse(execution)) automatically handles the new field since it's part of the schema

Validation

The implementation follows the existing patterns in the codebase:

  • Database field exists: concurrencyKey is already stored in the TaskRun table
  • Schema consistency: Both V3TaskRun and TaskRun schemas updated
  • Query updates: All database queries that build execution contexts now select concurrencyKey
  • Execution object creation: All locations that create execution objects now include concurrencyKey
  • Test coverage: Added test to verify the feature works end-to-end
  • Type safety: TypeScript types are automatically inferred from the Zod schemas

Usage Example

Now developers can access the concurrencyKey in their tasks:

export const myTask = task({
  id: "my-task",
  run: async (payload, { ctx }) => {
    // The concurrencyKey is now available in the context
    console.log("Concurrency key:", ctx.run.concurrencyKey); // "user-123"
    
    // Use it for logging, metrics, or business logic
    if (ctx.run.concurrencyKey) {
      // Handle user-specific logic
    }
  }
});

// When triggering:
await myTask.trigger(payload, {
  concurrencyKey: "user-123",
  queue: "user-queue"
});

The implementation is now complete and ready for use! The concurrencyKey will be properly passed through from the trigger options to the task context, making it available to developers in their task implementations.

Copy link

changeset-bot bot commented Aug 7, 2025

⚠️ No Changeset found

Latest commit: 2ae83b3

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Aug 7, 2025

Walkthrough

The changes introduce the optional concurrencyKey property to various objects and schemas related to task execution and run contexts. This property is now included in the loader's return value, database selection sets, execution payloads, and schema definitions for TaskRun and V3TaskRun. Additionally, the test suite for task execution has been updated to verify the correct propagation of the concurrencyKey within the task context, and the test utility function signature was modified to accept this parameter.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

  • The changes are straightforward, primarily involving the addition and propagation of a new optional property across several modules, schemas, and tests.
  • No complex logic, error handling, or control flow modifications are present.
  • The test update introduces a new case and a minor function signature change, slightly increasing review complexity above trivial.

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
packages/core/test/taskExecutor.test.ts (1)

947-967: LGTM! Consider adding edge case coverage.

The test case correctly validates that concurrencyKey is properly passed through to the task context. The implementation follows existing patterns and adequately tests the happy path.

Consider adding a complementary test case to verify behavior when concurrencyKey is undefined:

+  test("should handle undefined concurrencyKey in task context", async () => {
+    let capturedContext: any;
+
+    const task = {
+      id: "test-task",
+      fns: {
+        run: async (payload: any, params: RunFnParams<any>) => {
+          capturedContext = params.ctx;
+          return {
+            output: "test-output",
+          };
+        },
+      },
+    };
+
+    const result = await executeTask(task, { test: "data" });
+
+    expect(result.result.ok).toBe(true);
+    expect(capturedContext).toBeDefined();
+    expect(capturedContext.run.concurrencyKey).toBeUndefined();
+  });
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3cd7dd8 and 2ae83b3.

📒 Files selected for processing (5)
  • apps/webapp/app/routes/resources.runs.$runParam.ts (1 hunks)
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (2 hunks)
  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts (1 hunks)
  • packages/core/src/v3/schemas/common.ts (2 hunks)
  • packages/core/test/taskExecutor.test.ts (3 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.{ts,tsx}

📄 CodeRabbit Inference Engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations

Files:

  • apps/webapp/app/routes/resources.runs.$runParam.ts
  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
  • packages/core/test/taskExecutor.test.ts
  • packages/core/src/v3/schemas/common.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit Inference Engine (.github/copilot-instructions.md)

We use zod a lot in packages/core and in the webapp

Files:

  • apps/webapp/app/routes/resources.runs.$runParam.ts
  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
  • packages/core/test/taskExecutor.test.ts
  • packages/core/src/v3/schemas/common.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit Inference Engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: In the webapp, all environment variables must be accessed through the env export of env.server.ts, instead of directly accessing process.env.
When importing from @trigger.dev/core in the webapp, never import from the root @trigger.dev/core path; always use one of the subpath exports as defined in the package's package.json.

Files:

  • apps/webapp/app/routes/resources.runs.$runParam.ts
  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
**/*.test.{ts,tsx}

📄 CodeRabbit Inference Engine (.github/copilot-instructions.md)

Our tests are all vitest

Files:

  • packages/core/test/taskExecutor.test.ts
🧠 Learnings (14)
📓 Common learnings
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : When implementing schema tasks, use `schemaTask` from `trigger.dev/sdk/v3` and validate payloads as shown.
📚 Learning: in apps/webapp/app/services/runsrepository.server.ts, the in-memory status filtering after fetching ...
Learnt from: matt-aitken
PR: triggerdotdev/trigger.dev#2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.

Applied to files:

  • apps/webapp/app/routes/resources.runs.$runParam.ts
📚 Learning: do not use or add new code to the legacy run engine; focus on using and migrating to run engine 2.0 ...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/webapp.mdc:0-0
Timestamp: 2025-07-18T17:49:47.180Z
Learning: Do not use or add new code to the legacy run engine; focus on using and migrating to Run Engine 2.0 in `internal/run-engine`.

Applied to files:

  • apps/webapp/app/routes/resources.runs.$runParam.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : the `run` function contains your task logic in trigger....
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : The `run` function contains your task logic in Trigger.dev tasks.

Applied to files:

  • apps/webapp/app/routes/resources.runs.$runParam.ts
  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
  • packages/core/test/taskExecutor.test.ts
  • packages/core/src/v3/schemas/common.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : when using realtime features, use the `runs.subscribeto...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : When using Realtime features, use the `runs.subscribeToRun`, `runs.subscribeToRunsWithTag`, and `runs.subscribeToBatch` APIs as shown.

Applied to files:

  • apps/webapp/app/routes/resources.runs.$runParam.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : when using metadata in tasks, use the `metadata` api as...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : When using metadata in tasks, use the `metadata` API as shown, and only inside run functions or task lifecycle hooks.

Applied to files:

  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
  • packages/core/test/taskExecutor.test.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : when using retry, queue, machine, or maxduration option...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : When using retry, queue, machine, or maxDuration options, configure them as shown in the examples for Trigger.dev tasks.

Applied to files:

  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
  • packages/core/test/taskExecutor.test.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
📚 Learning: in `packages/core/src/v3/errors.ts`, within the `taskrunerrorenhancer` function, `error.message` is ...
Learnt from: nicktrn
PR: triggerdotdev/trigger.dev#1418
File: packages/core/src/v3/errors.ts:364-371
Timestamp: 2024-10-18T15:41:52.352Z
Learning: In `packages/core/src/v3/errors.ts`, within the `taskRunErrorEnhancer` function, `error.message` is always defined, so it's safe to directly call `error.message.includes("SIGTERM")` without additional checks.

Applied to files:

  • packages/core/test/taskExecutor.test.ts
  • packages/core/src/v3/schemas/common.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : tasks must be exported, even subtasks in the same file....
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Tasks must be exported, even subtasks in the same file.

Applied to files:

  • packages/core/test/taskExecutor.test.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : if you are able to generate an example payload for a ta...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : If you are able to generate an example payload for a task, do so.

Applied to files:

  • packages/core/test/taskExecutor.test.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : when implementing schema tasks, use `schematask` from `...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : When implementing schema tasks, use `schemaTask` from `trigger.dev/sdk/v3` and validate payloads as shown.

Applied to files:

  • packages/core/src/v3/schemas/common.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : when implementing scheduled (cron) tasks, use `schedule...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : When implementing scheduled (cron) tasks, use `schedules.task` from `trigger.dev/sdk/v3` and follow the shown patterns.

Applied to files:

  • packages/core/src/v3/schemas/common.ts
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : always generate trigger.dev tasks using the `task` func...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : ALWAYS generate Trigger.dev tasks using the `task` function from `trigger.dev/sdk/v3` and export them as shown in the correct pattern.

Applied to files:

  • packages/core/src/v3/schemas/common.ts
📚 Learning: applies to **/trigger/**/*.{ts,tsx,js,jsx} : you must use `@trigger.dev/sdk/v3` when writing trigger...
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-07-18T17:50:25.014Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : You MUST use `trigger.dev/sdk/v3` when writing Trigger.dev tasks.

Applied to files:

  • packages/core/src/v3/schemas/common.ts
🔇 Additional comments (8)
apps/webapp/app/routes/resources.runs.$runParam.ts (1)

192-192: LGTM! Consistent implementation following existing patterns.

The addition of concurrencyKey to the context follows the same pattern as other optional fields and correctly handles null values.

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts (1)

223-223: LGTM! Proper propagation of concurrencyKey to execution context.

The addition correctly includes the concurrencyKey in the execution payload, maintaining consistency with the existing codebase patterns.

packages/core/src/v3/schemas/common.ts (2)

225-225: LGTM! Schema definition correctly adds optional concurrencyKey field.

The addition of concurrencyKey as an optional string field to the TaskRun schema is appropriate and consistent with the feature requirements.


379-379: LGTM! Consistent schema update for V3TaskRun.

The addition maintains consistency between TaskRun and V3TaskRun schemas, ensuring the concurrencyKey field is available in both versions.

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (2)

1642-1642: LGTM! Database selection includes concurrencyKey field.

The addition ensures that the concurrencyKey is retrieved from the database when constructing execution payloads, which is essential for the feature to work end-to-end.


1726-1726: LGTM! Execution payload properly includes concurrencyKey.

The addition correctly propagates the concurrencyKey from the database query into the execution payload, completing the end-to-end flow for this feature.

packages/core/test/taskExecutor.test.ts (2)

871-872: LGTM! Clean function signature extension.

The optional concurrencyKey parameter is properly added while maintaining backward compatibility. The placement at the end of the parameter list follows good API design practices.


1930-1930: LGTM! Proper execution object assignment.

The concurrencyKey is correctly assigned to the execution.run object, maintaining consistency with other run properties and aligning with the schema updates.

@ericallam ericallam closed this Aug 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants